Cloudflare のメッセージキューサービス Queues を使ってみた。
Cloudflare Queues のオープンベータ版が公開されていたので触ってみました。
Cloudflare Queues とは
Cloudflare が提供するグローバルメッセージキューサービスで、メッセージの配信とバッチ処理を提供します。
Cloudflare Queues は Cloudflare Workers と統合されており、後述するように Workers から メッセージの送信と取得を操作します。
例えば複数の Workers 間でデータのやり取りが必要な場合、負荷に応じたバッファリングやバッチ処理、それによる処理の確実性の向上、またサービスコンポーネントを分離する役割などが期待できます。
エグレスやスループットに対しては料金が発生せず、オペレーション(キューの読み書き)の合計数に応じて課金されます。
Cloudflare Queues を触ってみる
ざっくり利用の流れです。
- Queues の有効化と作成
- Wrangler のインストール
- Worker プロジェクトの作成
- データプロデューサー側の Worker の作成
- データコンシューマー側の Worker の作成
1. Queues の有効化
Cloudflare ダッシュボードから新規キューを作成します。
※まだ Queues 有効化前であれば、 [Enable] のようなボタンをクリック
[sample-queue] という名前で作成してみました。いったん [inactive] ステータスのままで大丈夫です。
2. Wrangler のインストール
Wrangler は Workers をローカルから開発/デプロイする CLI ツールです。
Queue を利用するには Wrangler CLI のバージョン 2.2.1
以降がインストールされている必要があります。
▼未インストールの方
$ npm install -g wrangler もしくは $ yarn global add wrangler
▼既にインストール済みの方
$ wrangler --version 2.2.1
上記でバージョンが 2.2.1 未満であれば…
$ npm update -g wrangler
アカウント認証のためにログインします。
$ wrangler login
ブラウザが開き、認証が完了したら下記のような画面になります。
3. Worker プロジェクトの作成
サンプルのプロジェクトを作成します。.git
を作成するかなど色々聞かれますが、全部 y (Yes) で問題ないです。
$ wrangler init sample-worker-for-queues ✨ Initialized git repository at sample-worker-for-queues No package.json found. Would you like to create one? (y/n) ✨ Created sample-worker-for-queues/package.json Would you like to use TypeScript? (y/n) ✨ Created sample-worker-for-queues/tsconfig.json Would you like to create a Worker at sample-worker-for-queues/src/index.ts? None ❯ Fetch handler Scheduled handler
Workers から Queues を操作するため、Cloudflare においてバイディングとよばれる設定が必要です。これは特定のキューを Worker に接続して呼び出せるようにするための操作です。
先ほどの init で作成されたディレクトリ sample-worker-for-queues
の直下に wrangler.toml
があるはずですので、そちらに下記を追記します。
[[queues.producers]] queue = "sample-queue(先ほど作成したキューの名前)" binding = "SAMPLE_QUEUE" [[queues.consumers]] queue = "sample-queue"
4. データプロデューサー側の Worker の作成
プロデューサーとは、キューにメッセージを発行するクライアントを指します。
今回は試しにタイムスタンプをキューに送信するサンプルを作成してみます。
src/index.ts
を書き換えます。
export interface Env { SAMPLE_QUEUE: Queue, } export default { async fetch( request: Request, env: Env, ctx: ExecutionContext ): Promise<Response> { const timestamp_now = Date.now().toString() await env.SAMPLE_QUEUE.send({timestamp: timestamp_now}); return new Response(timestamp_now); }, };
バインディング上で [キュー名].send()
を呼び出すことで、該当のキューにメッセージを書き込めます。
なお、1つのキューは複数のプロデューサーを持つことができます。1つのキューに書き込めるプロデューサー(= Worker)の総数に制限はありません。
逆に、複数のキューを単一の Worker にバインドすることも可能とのことです。
またメッセージの中身については、シリアライズ可能なオブジェクトなら何でも発行できるらしく(Any serializable object can be published to a queue)、上記の例だとシンプルな JSON を渡しています。
Cloudflare に Workers を公開します(※現状 wrangler dev
では Queues は動きません)。
$ wrangler publish
ブラウザで Workers の URL を開いて、タイムスタンプが表示されていたらOKです。
なお、デバッグには wrangler tail が便利です↓ https://developers.cloudflare.com/workers/wrangler/commands/#tail
5. データコンシューマー側の Worker の作成
先ほどはキューにメッセージを書き込む処理でしたので、次に書き込まれたメッセージを取得するコンシューマーのコードを追加します。
export interface Env { SAMPLE_QUEUE: Queue, } export default { async fetch( request: Request, env: Env, ctx: ExecutionContext ): Promise { const timestamp_now = Date.now().toString() await env.SAMPLE_QUEUE.send({sample_timestamp: timestamp_now}); return new Response(timestamp_now); }, async queue( batch: MessageBatch, env: Env, ctx: ExecutionContext ): Promise { let messages = JSON.stringify(batch.messages) console.log("Received", messages) } };
先ほどのプロデューサーと異なり、各キューにつきアクティブなコンシューマーは1つまでになっており、これにより「少なくとも1回の配信」の要件を実現し、重複メッセージのリスクを最小化しているとのことです。
逆に、1つのコンシューマーからは複数のキューを使用できます。
上記の Worker を実行して $ wrangler tail
でログを見るとこんな感じ↓(各メッセージは自動でIDとタイムスタンプを持つので、明示的に timestamp を持たせた今回の例は少し分かりづらかった、すみません…)
(log) Received [{"body":{"sample_timestamp":"1671731104378"},"timestamp":"2022-12-22T17:45:04.466Z","id":"e10ffb69c9456d970ef53f4a9a2ddcc9"},{"body":{"sample_timestamp":"1671731104700"},"timestamp":"2022-12-22T17:45:04.729Z","id":"eccc41d2d3780e57e75eb8ec1d548dfa"}]
以上、簡単にメッセージの配信と取得を行ってみました。
Queues の オプション設定
wrangler.toml のコンシューマーからバッチ処理の設定が可能です。
[[queues.consumers]] queue = "sample-queue" max_batch_size = 10 max_batch_timeout = 30 max_retries = 10 dead_letter_queue = "sample-queue-dlq"
下記はいずれもオプショナルな設定です。
- max_batch_size … 各バッチで許可されるメッセージの最大数(最大100)。
- max_batch_timeout … バッチが一杯になるまでの最大待ち時間の秒数(最大30秒)。
- max_retries … 何度まで再試行するかの回数(最大100)。
- dead_letter_queue … 後述
デッドレターキューの処理
Queues でもデッドレターキューの処理が可能です。wrangler.toml からオプションとして設定し、処理が少なくとも max_retries 回失敗した場合にメッセージを送信する別の Queue の名前を指定できます。指定された名前の Queue がまだ存在しない場合は自動的に作成されます。 このオプションが定義されていない場合、処理に何度も失敗するメッセージは最終的に廃棄されます。
最後に
Cloudflare R2 も リリースされたので、組み合わせの例が色々思い付きますね。 例えば Worker の実行/エラーログを R2 に保存する際に Queues を通じて配信する処理などが簡単に、かつ1つの Worker で書けるのは嬉しいです。
今後のロードマップには FIFO(first-in first-out)なども入っているとのことで非常に楽しみです。
なお、Queues の利用にあたっては現状いくつか制限がありますのでご注意ください。
- 1アカウントあたりの最大 Queus: 10
- 最大メッセージサイズ: 128 KB
- スループット: 100メッセージ/秒
- 最大保持期間: 4日間(96時間)